"""
Copyright 2023 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""

from typing import Optional, Union
from enum import Enum

from arcticdb.dependencies import _PYARROW_AVAILABLE, _POLARS_AVAILABLE
from arcticdb.dependencies import pyarrow as pa
from arcticdb.encoding_version import EncodingVersion
from arcticdb_ext.storage import ModifiableLibraryOption, ModifiableEnterpriseLibraryOption
from arcticdb_ext.version_store import InternalOutputFormat, InternalArrowOutputStringFormat


DEFAULT_ENCODING_VERSION = EncodingVersion.V1


class LibraryOptions:
    """
    Configuration options for ArcticDB libraries.

    Attributes
    ----------

    dynamic_schema: bool
        See `__init__` for details.
    dedup: bool
        See `__init__` for details.
    rows_per_segment: int
        See `__init__` for details.
    columns_per_segment: int
        See `__init__` for details.
    recursive_normalizers: bool
        See `__init__` for details.
    """

    def __init__(
        self,
        *,
        dynamic_schema: bool = False,
        dedup: bool = False,
        rows_per_segment: int = 100_000,
        columns_per_segment: int = 127,
        encoding_version: Optional[EncodingVersion] = None,
        recursive_normalizers: bool = False,
    ):
        """
        Parameters
        ----------

        dynamic_schema: bool, default False
            Controls whether the library supports dynamically changing symbol schemas.

            The schema of a symbol refers to the order of the columns and the type of the columns.

            If False, then the schema for a symbol is set on each `write` call, and cannot then be
            modified by successive updates or appends. Each successive update or append must contain the same column set
            in the same order with the same types as the initial write.

            When disabled, ArcticDB will tile stored data across both the rows and columns. This enables highly efficient
            retrieval of specific columns regardless of the total number of columns stored in the symbol.

            If True, then updates and appends can contain columns not originally seen in the
            most recent write call. The data will be dynamically backfilled on read when required for the new columns.
            Furthermore, Arctic will support numeric type promotions should the type of a column change - for example,
            should column A be of type int32 on write, and of type float on the next append, the column will be returned as
            a float to Pandas on read. Supported promotions include (narrow) integer to (wider) integer, and integer to float.

            When enabled, ArcticDB will only tile across the rows of the data. This will result in slower column
            subsetting when storing a large number of columns (>1,000).

        dedup: bool, default False
            Controls whether calls to write and write_batch will attempt to deduplicate data segments against the
            previous live version of the specified symbol.

            If False, new data segments will always be written for the new version of the symbol being created.

            If True, the content hash, start index, and end index of data segments associated with the previous live
            version of this symbol will be compared with those about to be written, and will not be duplicated in the
            storage device if they match.

            Keep in mind that this is most effective when version n is equal to version n-1 plus additional data at the
            end - and only at the end! If there is additional data inserted at the start or into the the middle, then
            all segments occuring after that modification will almost certainly differ. ArcticDB creates new segments at
            fixed intervals and data is only de-duplicated if the hashes of the data segments are identical. A one row
            offset will therefore prevent this de-duplication.

            Note that these conditions will also be checked with write_pickle and write_pickle_batch. However, pickled
            objects are always written as a single data segment, and so dedup will only occur if the written object is
            identical to the previous version.

        rows_per_segment: int, default 100_000
            Together with columns_per_segment, controls how data being written, appended, or updated is sliced into
            separate data segment objects before being written to storage.

            By splitting data across multiple objects in storage, calls to read and read_batch that include the
            date_range and/or columns parameters can reduce the amount of data read from storage by only reading those
            data segments that contain data requested by the reader.

            For example, if writing a dataframe with 250,000 rows and 200 columns, by default, this will be sliced into
            6 data segments:
            1 - rows 1-100,000 and columns 1-127
            2 - rows 100,001-200,000 and columns 1-127
            3 - rows 200,001-250,000 and columns 1-127
            4 - rows 1-100,000 and columns 128-200
            5 - rows 100,001-200,000 and columns 128-200
            6 - rows 200,001-250,000 and columns 128-200

            Data segments that cover the same range of rows are said to belong to the same row-slice (e.g. segments 2
            and 5 in the example above). Data segments that cover the same range of columns are said to belong to the
            same column-slice (e.g. segments 2 and 3 in the example above).

            Note that this slicing is only applied to the new data being written, existing data segments from previous
            versions that can remain the same will not be modified. For example, if a 50,000 row dataframe with a single
            column is written, and then another dataframe also with 50,000 rows and one column is appended to it, there
            will still be two data segments each with 50,000 rows.

            Note that for libraries with dynamic_schema enabled, columns_per_segment does not apply, and there is always
            a single column-slice. However, rows_per_segment is used, and there will be multiple row-slices.

        columns_per_segment: int, default 127
            See rows_per_segment

        encoding_version: Optional[EncodingVersion], default None
            The encoding version to use when writing data to storage.
            v2 is faster, but still experimental, so use with caution.

        recursive_normalizers: bool, default False
            Whether to recursively normalize nested data structures when writing sequence-like or dict-like data.
            The data structure can be nested or a mix of lists and dictionaries.
            Note: If the leaf nodes cannot be natively normalized and must be written using write_pickle, those leaf nodes
            will be pickled, resulting in the overall data being only partially normalized and partially pickled.
            Example:
                data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})}
                lib = ac.create_library(lib_name)
                lib.write(symbol, data) # ArcticUnsupportedDataTypeException will be thrown by default
                lib2 = ac.create_library(lib_name, LibraryOptions(recursive_normalizers=True))
                lib2.write(symbol, data) # data will be successfully written
        """
        self.dynamic_schema = dynamic_schema
        self.dedup = dedup
        self.rows_per_segment = rows_per_segment
        self.columns_per_segment = columns_per_segment
        self.encoding_version = encoding_version
        self.recursive_normalizers = recursive_normalizers

    def __eq__(self, right):
        return (
            self.dynamic_schema == right.dynamic_schema
            and self.dedup == right.dedup
            and self.rows_per_segment == right.rows_per_segment
            and self.columns_per_segment == right.columns_per_segment
            and self.encoding_version == right.encoding_version
            and self.recursive_normalizers == right.recursive_normalizers
        )

    def __repr__(self):
        return (
            f"LibraryOptions(dynamic_schema={self.dynamic_schema}, dedup={self.dedup},"
            f" rows_per_segment={self.rows_per_segment}, columns_per_segment={self.columns_per_segment},"
            f" encoding_version={self.encoding_version if self.encoding_version is not None else 'Default'},"
            f" recursive_normalizers={self.recursive_normalizers})"
        )


# TODO: Use enum.StrEnum when we no longer need to support python 3.9
class OutputFormat(str, Enum):
    """
    Controls the output format of operations which return dataframes. All APIs which take an `output_format` argument
    accept the enum values and case-insensitive strings. E.g. all of `OutputFormat.PYARROW`, `"PYARROW"`, `"pyarrow"`
    will be interpreted as `OutputFormat.PYARROW`.

    PANDAS (default):
        Dataframes are returned as `pandas.DataFrame` or `pandas.Series` objects backed by numpy arrays.

    PYARROW:
        Dataframes are returned as `pyarrow.Table` objects using Apache Arrow's columnar memory format.
        Provides better performance than `PANDAS`, especially for dataframes containing many string columns.
        String format can be customized via `ArrowOutputStringFormat`.

    POLARS:
        Dataframes are returned as `polars.DataFrame` objects using Apache Arrow's columnar memory format.
        Provides better performance than `PANDAS`, especially for dataframes containing many string columns.
        String format can be customized via `ArrowOutputStringFormat`.
    """

    PANDAS = "PANDAS"
    PYARROW = "PYARROW"
    POLARS = "POLARS"


def output_format_to_internal(output_format: Union[OutputFormat, str]) -> InternalOutputFormat:
    if output_format.lower() == OutputFormat.PANDAS.lower():
        return InternalOutputFormat.PANDAS
    elif output_format.lower() == OutputFormat.PYARROW.lower():
        if not _PYARROW_AVAILABLE:
            raise ModuleNotFoundError(
                "ArcticDB's pyarrow optional dependency missing but is required to use arrow output format."
            )
        return InternalOutputFormat.ARROW
    elif output_format.lower() == OutputFormat.POLARS.lower():
        if not _PYARROW_AVAILABLE or not _POLARS_AVAILABLE:
            raise ModuleNotFoundError(
                "ArcticDB's pyarrow or polars optional dependencies are missing but are required to use polars output format."
            )
        return InternalOutputFormat.ARROW
    else:
        raise ValueError(f"Unknown OutputFormat: {output_format}")


class ArrowOutputStringFormat(str, Enum):
    """
    Controls the string column format when using `PYARROW` or `POLARS` output formats.
    Accepts either the enum value or the corresponding `pyarrow.DataType`.

    LARGE_STRING (default):
        Uses 64-bit variable-size encoding.
        PyArrow: `pa.large_string()`, Polars: `pl.String`
        Supports up to 2⁶³-1 bytes total string length per Arrow array.
        Best for general-purpose use and when working with large string data.

    SMALL_STRING:
        Uses 32-bit variable-size encoding.
        PyArrow: `pa.string()`, Polars: Not supported
        Supports up to 2³¹-1 bytes total string length per Arrow array.
        Only supported with PyArrow because Polars does not support small strings.
        Slightly more memory efficient than `LARGE_STRING` when string data is known to be small.

    CATEGORICAL and DICTIONARY_ENCODED:
        Both are aliases for dictionary-encoded strings with int32 indices.
        PyArrow: `pa.dictionary(pa.int32(), pa.large_string())`, Polars: `pl.Categorical`
        Best for columns with low cardinality (few unique values repeated many times).
        Deduplicates strings, reducing memory usage and improving performance when the number of
        unique values is much smaller than the total number of rows.

    For more details on physical layouts, see the Apache Arrow specification:
    https://arrow.apache.org/docs/format/Columnar.html
    """

    CATEGORICAL = "CATEGORICAL"
    DICTIONARY_ENCODED = "DICTIONARY_ENCODED"
    LARGE_STRING = "LARGE_STRING"
    SMALL_STRING = "SMALL_STRING"


def arrow_output_string_format_to_internal(
    arrow_string_format: Union[ArrowOutputStringFormat, "pa.DataType"], output_format: Union[OutputFormat, str]
) -> InternalArrowOutputStringFormat:
    if (
        arrow_string_format == ArrowOutputStringFormat.CATEGORICAL
        or arrow_string_format == ArrowOutputStringFormat.DICTIONARY_ENCODED
        or _PYARROW_AVAILABLE
        and arrow_string_format == pa.dictionary(pa.int32(), pa.large_string())
    ):
        return InternalArrowOutputStringFormat.CATEGORICAL
    elif (
        arrow_string_format == ArrowOutputStringFormat.LARGE_STRING
        or _PYARROW_AVAILABLE
        and arrow_string_format == pa.large_string()
    ):
        return InternalArrowOutputStringFormat.LARGE_STRING
    elif (
        arrow_string_format == ArrowOutputStringFormat.SMALL_STRING
        or _PYARROW_AVAILABLE
        and arrow_string_format == pa.string()
    ):
        if output_format.lower() == OutputFormat.POLARS.lower():
            raise ValueError(
                "SMALL_STRING is not supported with POLARS output format. Please use LARGE_STRING instead."
            )
        return InternalArrowOutputStringFormat.SMALL_STRING
    else:
        raise ValueError(f"Unkown ArrowOutputStringFormat: {arrow_string_format}")


class RuntimeOptions:
    def __init__(
        self,
        *,
        output_format: Union[OutputFormat, str] = OutputFormat.PANDAS,
        arrow_string_format_default: ArrowOutputStringFormat = ArrowOutputStringFormat.LARGE_STRING,
    ):
        self.output_format = output_format
        self.arrow_string_format_default = arrow_string_format_default

    def set_output_format(self, output_format: Union[OutputFormat, str]):
        self.output_format = output_format

    def set_arrow_string_format_default(self, arrow_string_format_default: ArrowOutputStringFormat):
        self.arrow_string_format_default = arrow_string_format_default


class EnterpriseLibraryOptions:
    """
    Configuration options for ArcticDB libraries, that should only be used when you are using the ArcticDB enterprise
    features.

    Contact `info@arcticdb.io` for more information about the Enterprise features.

    Attributes
    ----------

    replication: bool
        See `__init__` for details.

    background_deletion: bool
        See `__init__` for details.
    """

    def __init__(
        self,
        *,
        replication: bool = False,
        background_deletion: bool = False,
    ):
        """
        Parameters
        ----------

        replication: bool, default False

            Whether to use the replication feature for this library. The replication tool is an enterprise tool that
            is used for one-way replication of the contents of a library to an arbitrary number of secondary storages.

            When enabled, modifications to this library will also write an oplog message that will be consumed by the
            replication tool.

            When using this option, replication target libraries _must_ have background_deletion=True. We recommend
            that source libraries also have background_deletion=True as this improves replication performance.

        background_deletion: bool, default False
            Whether to use background deletion for this library. The background deletion tool is an enterprise tool
            that performs data deletion in the background, so that writers to an ArcticDB library do not need to spend
            time doing the deletion themselves.

            When enabled, deletions and version pruning on this library will only logically delete data. The background
            deletion tool is responsible for the physical deletion.

            This flag does not affect the logical semantics of the ArcticDB APIs. Readers will see data in the same
            state following a `delete` or `prune_previous_versions` call regardless of whether this flag is set or not.
        """
        self.replication = replication
        self.background_deletion = background_deletion

    def __eq__(self, right):
        return self.replication == right.replication and self.background_deletion == right.background_deletion

    def __repr__(self):
        return (
            f"EnterpriseLibraryOptions(replication={self.replication}, background_deletion={self.background_deletion})"
        )
